package com.tomato.study.redis.stream.consumer.config;

import com.tomato.study.redis.stream.consumer.listener.RedisStreamAckListener;
import com.tomato.study.redis.stream.consumer.listener.RedisStreamListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.data.redis.stream.Subscription;

import javax.annotation.Resource;

import static com.tomato.study.redis.stream.core.RedisConstants.*;

/**
 * redis stream 配置（redis5.0以上）
 *
 * @author lizhifu
 * @date 2022/5/11
 */
@Configuration
public class RedisStreamConfig {

    @Resource
    private RedisStreamAckListener redisStreamAckListener;

    @Resource
    private RedisStreamListener redisStreamListener;


    /**
     * 订阅者1，消费组group1，收到消息后自动确认，与订阅者2为竞争关系，消息仅被其中一个消费
     * @param streamMessageListenerContainer
     * @return
     */
    @Bean
    public Subscription subscription(StreamMessageListenerContainer streamMessageListenerContainer){
        // 自动确认
        Subscription subscription = streamMessageListenerContainer.receiveAutoAck(
                Consumer.from(REDIS_STREAM1_GROUP1,REDIS_STREAM1_GROUP1_CONSUMER1),
                StreamOffset.create(REDIS_STREAM1, ReadOffset.lastConsumed()),
                redisStreamListener
        );
        return subscription;
    }

    /**
     * 订阅者2，消费组group1，收到消息后自动确认，与订阅者1为竞争关系，消息仅被其中一个消费
     * @param streamMessageListenerContainer
     * @return
     */
    @Bean
    public Subscription subscription2(StreamMessageListenerContainer streamMessageListenerContainer){
        // 自动确认
        Subscription subscription = streamMessageListenerContainer.receiveAutoAck(
                Consumer.from(REDIS_STREAM1_GROUP1,REDIS_STREAM1_GROUP1_CONSUMER2),
                StreamOffset.create(REDIS_STREAM1, ReadOffset.lastConsumed()),
                redisStreamListener
        );
        return subscription;
    }

    /**
     * 订阅者3，消费组group2，收到消息后不自动确认，需要用户选择合适的时机确认，
     * 与订阅者1和2非竞争关系，即使消息被订阅者1或2消费，亦可消费
     *
     * 当某个消息被ACK，PEL列表就会减少
     * 如果忘记确认（ACK），则PEL列表会不断增长占用内存
     * 如果服务器发生意外，重启连接后将再次收到PEL中的消息ID列表
     * @param streamMessageListenerContainer
     * @return
     */
    @Bean
    public Subscription subscription3(StreamMessageListenerContainer streamMessageListenerContainer){
        // 非自动确认
        Subscription subscription = streamMessageListenerContainer.receive(
                Consumer.from(REDIS_STREAM1_GROUP2,REDIS_STREAM1_GROUP2_CONSUMER1),
                StreamOffset.create(REDIS_STREAM1, ReadOffset.lastConsumed()),
                redisStreamAckListener
        );
        return subscription;
    }
}
